package com.heima.article.listener;

import com.alibaba.fastjson.JSON;
import com.heima.model.article.vos.ArticleVisitStreamMess;
import com.heima.model.behavoir.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;


@Configuration
@Slf4j
public class KafkaStreamHelloListener {

    // 1. 设置窗口时间（多少秒进行合并和发送）
    // 2. 分组（文章id）
    // 3. 累加

    @Bean
    public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-in");
        stream.map((key, value) -> {
            // 1. 因为我后面要根据articleId进行分组
            // 先把articleId从JSON结构里面提取出来，放到key这个位置
            // 为什么放在key？因为key这个位置，在目前这个阶段是没用的
            if (StringUtils.isEmpty(value)) {
                log.warn("value为空");
                return new KeyValue<>(key, value);
            }

            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            if (mess == null) {
                log.warn("value为空2");
                return new KeyValue<>(key, value);
            }

            Long articleId = mess.getArticleId();
            Integer add = mess.getAdd();
            UpdateArticleMess.UpdateArticleType type = mess.getType();

            return new KeyValue<>(articleId.toString(), type + ":" + add);
        })
                .groupBy((key, value) -> key)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .aggregate(
                        // 声明一个bucket（容器）
                        new Initializer<String>() {
                            @Override
                            public String apply() {
                                return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                            }
                        },

                        // 累加
                        new Aggregator<String, String, String>() {
                            @Override
                            public String apply(String key, String value, String initData) {
                                // key：articleId
                                // value: type + add
                                // initData: COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0

                                // 1. 判断本次消息是什么类型

                                String[] valueArray = value.split(":");

                                // 类型：COLLECTION,COMMENT,LIKES,VIEWS
                                String type = valueArray[0];
                                // 数值
                                Integer add = Integer.parseInt(valueArray[1]);

                                // 2. 根据类型去更新对应的数值
                                // 2.1 将字符串结构转成Map结构，key为对应类型 value为类型当前的数值
                                Map<String, Integer> initMap = string2Map(initData);

                                // 本次操作类型所对应的总值
                                Integer totalAmount = initMap.get(type);
                                int newAmount = totalAmount + add;
                                // 2.2 根据本次操作的type去map里面匹配数值，并且对数值进行+1/-1操作
                                initMap.put(type, newAmount);

                                // 2.3 将map转回字符串结构
                                String result = map2String(initMap);
                                if (StringUtils.isEmpty(result)) {
                                    log.warn("出现异常啦");
                                    return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                                }

                                return result;
                            }
                        },

                        // 合成器
                        Materialized.as("aaa")
                )
                .toStream()
                .map((key, value) -> {
                    return new KeyValue<>(key.key().toString(), stringToJsonVo(value, Long.parseLong(key.key())));
                })
                .to("itcast-topic-out");

        return stream;
    }

    private String stringToJsonVo(String str, Long articleId) {
        // 1. 收到字符串 "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"

        // 2. 判空
        if (StringUtils.isEmpty(str)) {
            return "";
        }

        // 3. 解析字符串 ->转成map结构
        Map<String, Integer> map = string2Map(str);

        // 4. 判断类型，根据不同的类型，累加到对应的实体类字段上
        Integer collection = map.get("COLLECTION");
        Integer comment = map.get("COMMENT");
        Integer likes = map.get("LIKES");
        Integer views = map.get("VIEWS");

        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        if (collection != null) {
            mess.setCollect(collection);
        }

        if (comment != null) {
            mess.setComment(comment);
        }

        if (likes != null) {
            mess.setLike(likes);
        }

        if (views != null) {
            mess.setView(views);
        }

        mess.setArticleId(articleId);

        return JSON.toJSONString(mess);
    }


    private String map2String(Map<String, Integer> map) {
        if (CollectionUtils.isEmpty(map)) {
            return null;
        }

        // 1. 得到map里面所有的key
        Set<String> keySet = map.keySet();

        StringBuilder builder = new StringBuilder();

        // 2. 遍历key，获取他们对应的值
        for (String key : keySet) {
            if (StringUtils.isEmpty(key)) {
                continue;
            }

            Integer value = map.get(key);

            builder.append(key).append(":").append(value).append(",");
        }

        // 3. StringBuilder，将多组值组合起来
        // COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0,
        String finalString = builder.toString();

        return finalString.substring(0, finalString.length() - 1);
    }

    private Map<String, Integer> string2Map(String str) {
        // 1. 接收COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        Map<String, Integer> map = new HashMap<>();

        // 2. 判空
        if (StringUtils.isEmpty(str)) {
            return new HashMap<>();
        }

        // 3. 先用，进行拆分 -> array
        String[] arr1 = str.split(",");
        if (ArrayUtils.isEmpty(arr1)) {
            return new HashMap<>();
        }

        for (String item : arr1) {
            if (StringUtils.isEmpty(item)) {
                continue;
            }

            // 在这个阶段：你得到值是这样的形式 COLLECTION:0
            String[] arr2 = item.split(":");
            if (ArrayUtils.isEmpty(arr2)) {
                continue;
            }

            // 获得对应的key和value
            String mapKey = arr2[0];
            Integer mapValue = Integer.parseInt(arr2[1]);

            map.put(mapKey, mapValue);
        }

        return map;
    }

}